Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ANT-745] Complete Lever Integration #2

Merged
merged 7 commits into from
Jan 2, 2024
Merged

[ANT-745] Complete Lever Integration #2

merged 7 commits into from
Jan 2, 2024

Conversation

marxlow
Copy link

@marxlow marxlow commented Oct 9, 2023

Requires setting up connectors with this initial state. 631152000000 = 1990-01-01, we are essentially initializing state for the first time which is necessary for our current implementation of incremental sub streams.

[
  {
    "streamDescriptor": {
      "name": "opportunities"
    },
    "streamState": {
      "updatedAt": 631152000000
    }
  }
]

@github-actions
Copy link

github-actions bot commented Oct 9, 2023

Before Merging a Connector Pull Request

Wow! What a great pull request you have here! 🎉

To merge this PR, ensure the following has been done/considered for each connector added or updated:

  • PR name follows PR naming conventions
  • Breaking changes are considered. If a Breaking Change is being introduced, ensure an Airbyte engineer has created a Breaking Change Plan.
  • Connector version has been incremented in the Dockerfile and metadata.yaml according to our Semantic Versioning for Connectors guidelines
  • You've updated the connector's metadata.yaml file any other relevant changes, including a breakingChanges entry for major version bumps. See metadata.yaml docs
  • Secrets in the connector's spec are annotated with airbyte_secret
  • All documentation files are up to date. (README.md, bootstrap.md, docs.md, etc...)
  • Changelog updated in docs/integrations/<source or destination>/<name>.md with an entry for the new version. See changelog example
  • Migration guide updated in docs/integrations/<source or destination>/<name>-migrations.md with an entry for the new version, if the version is a breaking change. See migration guide example
  • If set, you've ensured the icon is present in the platform-internal repo. (Docs)

If the checklist is complete, but the CI check is failing,

  1. Check for hidden checklists in your PR description

  2. Toggle the github label checklist-action-run on/off to re-run the checklist CI.

Comment on lines 223 to 230
def parse_response(self, response: requests.Response, stream_slice:[Mapping[str, Any]], **kwargs) -> Iterable[Mapping]:
records = response.json()["data"]
if not records:
records = [{}]

for record in records:
record["opportunity"] = stream_slice["opportunity_id"]
yield from records
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fix for causing heartbeat issues.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment explaining the fix?

Copy link
Collaborator

@junquanlim junquanlim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@NatElkins
Copy link

I'm going to take a longer look at this later, hopefully today. But can you explain why we have fully forked the repo? I am not against it, just want to understand the reasoning (as opposed to simply using the SDK in a separate repo, if such a thing is possible).

@NatElkins
Copy link

I just worry about it being a maintenance burden.

Copy link

@NatElkins NatElkins left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just gave this a quick first pass. Would love to touch base on this later this evening (your morning) for some synchronous review.

Comment on lines 91 to 96
# 1797 opportunities
# 1 record read
# stream_params = {"confidentiality": "all", "expand": "contact", "stage_id": "e54475bb-d3ad-43ff-b8b9-76c4fc38e78c" }

# 8311 opportunities
# stream_params = {"confidentiality": "all", "expand": "contact", "stage_id": "3a255cc8-0732-4bee-92bd-62acfec3572c" }

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dead code?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed :)

Comment on lines 174 to 200
# TODO: Basic incremental stream
class IncrementalLeverStream(LeverStream, ABC):
"""
TODO fill in details of this class to implement functionality related to incremental syncs for your connector.
if you do not need to implement incremental sync for any streams, remove this class.
"""

# TODO: Fill in to checkpoint stream reads after N records. This prevents re-reading of data if the stream fails for any reason.
state_checkpoint_interval = None

@property
def cursor_field(self) -> str:
"""
TODO
Override to return the cursor field used by this stream e.g: an API entity might always use created_at as the cursor field. This is
usually id or date based. This field's presence tells the framework this in an incremental stream. Required for incremental.

:return str: The name of the cursor field.
"""
return []

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
"""
Override to determine the latest state after reading the latest record. This typically compared the cursor_field from the latest record and
the current state and picks the 'most' recent cursor. This is how a stream's state is determined. Required for incremental.
"""
return {}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are not using this, should we delete it? If we want to keep it around in case we plan on implementing it later, can we improve comment at the top?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch but I will keep it for the next upcoming PR to add incremental extracts

def path(self, stream_slice: Mapping[str, any] = None, **kwargs) -> str:
return f"opportunities/{stream_slice['opportunity_id']}/{self.name}"

def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it permitted to type Any as a more specific type? Or must it match Any?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved this. It's permitted to type Any. Not being loose here, this is generated from the Airbyte script

Comment on lines 223 to 230
def parse_response(self, response: requests.Response, stream_slice:[Mapping[str, Any]], **kwargs) -> Iterable[Mapping]:
records = response.json()["data"]
if not records:
records = [{}]

for record in records:
record["opportunity"] = stream_slice["opportunity_id"]
yield from records

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment explaining the fix?

def parse_response(self, response: requests.Response, stream_slice:Mapping[str, Any], **kwargs) -> Iterable[Mapping]:
yield from response.json()["data"]

class Opportunities(LeverStream):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeap added

return {"offset": response_data["next"]}

def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should any be Any here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, resolved

# super().__init__(**kwargs)
# self._start_date = start_date

def path(self, stream_slice: Mapping[str, any] = None, **kwargs) -> str:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be Any?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeap, resolved

Comment on lines +1 to +4
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

Copy link

@NatElkins NatElkins Oct 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this file actually being used?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, but i'll leave it to write tests

source = SourceLever()
config_mock = MagicMock()
streams = source.streams(config_mock)
# TODO: replace this with your streams number

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dead code

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved

config_mock = MagicMock()
streams = source.streams(config_mock)
# TODO: replace this with your streams number
expected_streams_number = 2

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this assertion correct?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed :)

def parse_response(self, response: requests.Response, stream_slice:Mapping[str, Any], **kwargs) -> Iterable[Mapping]:
yield from response.json()["data"]

"""
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Important - need to pay special attention here @junquanlim

@marxlow marxlow merged commit d003f19 into main Jan 2, 2024
7 of 15 checks passed
@marxlow marxlow deleted the ANTS-745 branch January 2, 2024 06:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants